-
Notifications
You must be signed in to change notification settings - Fork 40
Move parallelism level for the importing process from the record level to the data chunk level #2728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move parallelism level for the importing process from the record level to the data chunk level #2728
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the import process to parallelize at the data-chunk level rather than the record level, improving thread-pool utilization and reducing synchronization bottlenecks.
- Introduces separate executors for reading and processing chunks, coordinated via a blocking queue.
- Uses a
SemaphoreandPhaserto limit and track concurrent task submissions. - Removes per-record and per-transaction thread pools, processing those sequentially within each chunk.
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java | Split reader/processor executors; added Semaphore/Phaser; refactored chunk submission and shutdown logic. |
| data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java | Expanded tests for chunk-level parallelism; introduced TestImportProcessor for concurrency tracking. |
Comments suppressed due to low confidence (3)
data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java:102
- A
CompletionExceptionfromreaderFuture.join()isn’t caught, so an exceptional reader task can bypass thephaser.arriveAndAwaitAdvance()and leave the phaser in an inconsistent state. Wrapjoin()in a try-catch forCompletionException(or catchRuntimeException) to ensure the main phaser party always deregisters.
readerFuture.join();
data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java:67
- The
TableColumnDataTypestype is not imported, causing a compilation error. Add the corresponding import for this mock type.
@Mock private TableColumnDataTypes tableColumnDataTypes;
data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java:116
- [nitpick] In the Javadoc, backticks are used around
ExecutorService; for consistency with project style, use<code>ExecutorService</code>tags instead of Markdown-style backticks.
/**
* Shuts down the given `ExecutorService` gracefully. This method attempts to cleanly shut down
…unk-level-in-import-processor
komamitsu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! 👍
@thongdk8 BTW, I guess there is a slight concern with the changes of the PR in the following corner case:
- There are 100 data chunks
- Only the first data chunk contains huge import rows (e.g., 1000 times larger than ones of other chunks). Yes, it's a data skew
- The thread for the first chunk is sequentially handling rows for a while, although other threads quickly completed other chunks and are waiting for the first thread
In this case, the original implementation might work better (thanks to the record level parallelism). But, it should be a rare case and we can ignore such concern, right?
| () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkReaderExecutor); | ||
|
|
||
| while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { | ||
| ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not related to this PR, but why we need to wake up every 100 ms? I just think longer timeout or using take() might be better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The queue is buffered, so the timeout of 100ms would be fine I think.
We can replace it with take, but it seems a bit unsafe since the take will block the loop (deadlock) and you would never get a chance to recheck the while conditions unless a new item arrives. For example, if the queue is empty but the reader is on going for exiting its job (the reader future is going to be done), although this case is hard to be happenned, but still has a posibility. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, you're correct. I didn't consider the exit condition. Let's keep it as is.
|
@komamitsu Thank you for reviewing. Regarding the case you mentioned, I agree that in the mentioned case the behavior will be like you mentioned, but it is a rare case yes, and it happens only when the chunk size is large and close to the number of total records, and for the last batch of chunks processed by the pool. So I think we can ignore such concern as you said. |
…unk-level-in-import-processor
...r/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java
Outdated
Show resolved
Hide resolved
| new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); | ||
|
|
||
| // Semaphore controls concurrent task submissions, small buffer to be two times of threads | ||
| Semaphore taskSemaphore = new Semaphore(params.getImportOptions().getMaxThreads() * 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a Semaphore, I think we can use an ExecutorService with a bounded blocking queue, as shown below:
ExecutorService dataChunkProcessorExecutor =
new ThreadPoolExecutor(
params.getImportOptions().getMaxThreads(),
params.getImportOptions().getMaxThreads(),
0L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(params.getImportOptions().getMaxThreads() * 2),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, we can use bounded blocking queue like you mentioned, but as it is using CallerRunsPolicy so when the thread pool is completely saturated (all threads busy + queue full), CallerRunsPolicy runs the task in the calling thread (main thread in our case) instead of the pool threads, that is not what we expect as all the processing tasks are executed in the pool is more natural I think.
There are other policies as well, but seems there is no suitable one unless we create a custom one, so I think using semaphore with a fixed pool here is ok. WDYT?
…ataimport/processor/ImportProcessor.java Co-authored-by: Toshihiro Suzuki <[email protected]>
brfrn169
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
…unk-level-in-import-processor
ypeckstadt
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thank you.
inv-jishnu
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Thank you.
feeblefakie
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you!
…unk-level-in-import-processor
Torch3333
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you!
…l to the data chunk level (#2728) Co-authored-by: Toshihiro Suzuki <[email protected]> Co-authored-by: Peckstadt Yves <[email protected]>
Description
The current import process parallelizes at the record level, which creates inefficiencies in thread pool utilization. The main thread must wait for all record processing threads to complete before moving to the next data chunk, causing the thread pool to be underutilized when waiting for the slowest batch to finish. Also the current implementation is sensitive to the params of
tx_size,chunk sizeand number ofthreads, for example, iftx_size=10,chunk_size=30, andthreads=4then we dont utilized all the threads as the chunk split into 3 tasks and run it in the pool, it also happend if the thechunk_sizeis not diviable bytx_size * threadsThis PR moves parallelism from the record level to the data chunk level, allowing continuous processing of data chunks through the same thread pool using a blocking queue. Improved overall import performance. PTAL. Thank you.
Before:
After:
Related issues and/or PRs
NA
Changes made
Checklist
Additional notes (optional)
NA
Release notes
NA